/*-------------------------------------------------------------------------
 *
 * stage_protocol.c
 *
 * Routines for staging PostgreSQL table data as shards into the distributed
 * cluster. These user-defined functions are similar to the psql-side \stage
 * command, but also differ from them in that users stage data from tables and
 * not files, and that they can also append to existing shards.
 *
 * Copyright (c) Citus Data, Inc.
 *
 * $Id$
 *
 *-------------------------------------------------------------------------
 */

#include "postgres.h"

#include "funcapi.h"
#include "libpq/libpq-fe.h"
#include "miscadmin.h"

#include "access/htup.h"
#include "access/xact.h"
#include "catalog/indexing.h"
#include "catalog/namespace.h"
#include "catalog/pg_partition_fn.h"
#include "commands/tablecmds.h"
#include "storage/lmgr.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/inval.h"
#include "utils/lsyscache.h"
#include "utils/rel.h"
#include "utils/syscache.h"

#include "distributed/adaptive_executor.h"
#include "distributed/citus_ruleutils.h"
#include "distributed/colocation_utils.h"
#include "distributed/commands.h"
#include "distributed/connection_management.h"
#include "distributed/coordinator_protocol.h"
#include "distributed/deparse_shard_query.h"
#include "distributed/distributed_planner.h"
#include "distributed/foreign_key_relationship.h"
#include "distributed/hash_helpers.h"
#include "distributed/listutils.h"
#include "distributed/lock_graph.h"
#include "distributed/metadata_cache.h"
#include "distributed/metadata_utility.h"
#include "distributed/multi_executor.h"
#include "distributed/multi_join_order.h"
#include "distributed/multi_partitioning_utils.h"
#include "distributed/pg_dist_partition.h"
#include "distributed/pg_dist_shard.h"
#include "distributed/placement_connection.h"
#include "distributed/reference_table_utils.h"
#include "distributed/relation_access_tracking.h"
#include "distributed/remote_commands.h"
#include "distributed/resource_lock.h"
#include "distributed/session_ctx.h"
#include "distributed/transaction_management.h"
#include "distributed/version_compat.h"
#include "distributed/worker_manager.h"
#include "distributed/worker_protocol.h"

/* Local functions forward declarations */
static List* RelationShardListForShardCreate(ShardInterval* shardInterval);
static bool WorkerShardStats(ShardPlacement* placement, Oid relationId,
                             const char* shardName, uint64* shardSize);
static void UpdateTableStatistics(Oid relationId);
static void ReceiveAndUpdateShardsSizes(List* connectionList);
static void UpdateShardSize(uint64 shardId, ShardInterval* shardInterval, Oid relationId,
                            List* shardPlacementList, uint64 shardSize);
static bool ProcessShardStatisticsRow(PGresult* result, int64 rowIndex, uint64* shardId,
                                      uint64* shardSize);

/* exports for SQL callable functions */
PG_FUNCTION_INFO_V1(master_create_empty_shard);
PG_FUNCTION_INFO_V1(spq_update_shard_statistics);
PG_FUNCTION_INFO_V1(spq_update_table_statistics);

extern "C" Datum master_create_empty_shard(PG_FUNCTION_ARGS);
extern "C" Datum spq_update_shard_statistics(PG_FUNCTION_ARGS);
extern "C" Datum spq_update_table_statistics(PG_FUNCTION_ARGS);

/*
 * master_create_empty_shard creates an empty shard for the given distributed
 * table. The function first updates metadata on the coordinator node to make
 * this shard visible. Then it creates empty shard on worker node and added
 * shard placement row to metadata table.
 */
Datum master_create_empty_shard(PG_FUNCTION_ARGS)
{
    CheckCitusVersion(ERROR);

    text* relationNameText = PG_GETARG_TEXT_P(0);
    char* relationName = text_to_cstring(relationNameText);
    uint32 attemptableNodeCount = 0;
    ObjectAddress* tableAddress =
        static_cast<ObjectAddress*>(palloc0(sizeof(ObjectAddress)));

    uint32 candidateNodeIndex = 0;
    List* candidateNodeList = NIL;
    text* nullMinValue = NULL;
    text* nullMaxValue = NULL;
    char storageType = SHARD_STORAGE_TABLE;

    Oid relationId = ResolveRelationId(relationNameText, false);

    EnsureTablePermissions(relationId, ACL_INSERT);
    CheckDistributedTable(relationId);

    /*
     * distributed tables might have dependencies on different objects, since we create
     * shards for a distributed table via multiple sessions these objects will be created
     * via their own connection and committed immediately so they become visible to all
     * sessions creating shards.
     */
    ObjectAddressSet(*tableAddress, RelationRelationId, relationId);
    EnsureAllObjectDependenciesExistOnAllNodes(list_make1(tableAddress));
    EnsureReferenceTablesExistOnAllNodes();

    /* don't allow the table to be dropped */
    LockRelationOid(relationId, AccessShareLock);

    /* don't allow concurrent node list changes that require an exclusive lock */
    LockRelationOid(DistNodeRelationId(), RowShareLock);

    /* set the storage type of foreign tables to 'f' */
    if (IsForeignTable(relationId)) {
        storageType = SHARD_STORAGE_FOREIGN;
    }

    if (IsCitusTableType(relationId, HASH_DISTRIBUTED)) {
        ereport(ERROR,
                (errmsg("relation \"%s\" is a hash partitioned table", relationName),
                 errdetail("We currently don't support creating shards "
                           "on hash-partitioned tables")));
    } else if (IsCitusTableType(relationId, SINGLE_SHARD_DISTRIBUTED)) {
        ereport(ERROR, (errmsg("relation \"%s\" is a single shard table", relationName),
                        errdetail("We currently don't support creating shards "
                                  "on single shard tables")));
    } else if (IsCitusTableType(relationId, REFERENCE_TABLE)) {
        ereport(ERROR, (errmsg("relation \"%s\" is a reference table", relationName),
                        errdetail("We currently don't support creating shards "
                                  "on reference tables")));
    } else if (IsCitusTableType(relationId, CITUS_LOCAL_TABLE)) {
        ereport(ERROR, (errmsg("relation \"%s\" is a local table", relationName),
                        errdetail("We currently don't support creating shards "
                                  "on local tables")));
    }

    /* generate new and unique shardId from sequence */
    uint64 shardId = GetNextShardId();

    /* if enough live groups, add an extra candidate node as backup */
    List* workerNodeList = DistributedTablePlacementNodeList(NoLock);

    if (list_length(workerNodeList) > Session_ctx::Vars().ShardReplicationFactor) {
        attemptableNodeCount = Session_ctx::Vars().ShardReplicationFactor + 1;
    } else {
        attemptableNodeCount = Session_ctx::Vars().ShardReplicationFactor;
    }

    /* first retrieve a list of random nodes for shard placements */
    while (candidateNodeIndex < attemptableNodeCount) {
        WorkerNode* candidateNode =
            WorkerGetRoundRobinCandidateNode(workerNodeList, shardId, candidateNodeIndex);
        if (candidateNode == NULL) {
            ereport(ERROR, (errmsg("could only find %u of %u possible nodes",
                                   candidateNodeIndex, attemptableNodeCount)));
        }

        candidateNodeList = lappend(candidateNodeList, candidateNode);
        candidateNodeIndex++;
    }

    InsertShardRow(relationId, shardId, storageType, nullMinValue, nullMaxValue);

    CreateAppendDistributedShardPlacements(relationId, shardId, candidateNodeList,
                                           Session_ctx::Vars().ShardReplicationFactor);

    PG_RETURN_INT64(shardId);
}

/*
 * spq_update_shard_statistics updates metadata (shard size and shard min/max
 * values) of the given shard and returns the updated shard size.
 */
Datum spq_update_shard_statistics(PG_FUNCTION_ARGS)
{
    CheckCitusVersion(ERROR);

    int64 shardId = PG_GETARG_INT64(0);

    uint64 shardSize = UpdateShardStatistics(shardId);

    PG_RETURN_INT64(shardSize);
}

/*
 * spq_update_table_statistics updates metadata (shard size and shard min/max
 * values) of the shards of the given table
 */
Datum spq_update_table_statistics(PG_FUNCTION_ARGS)
{
    CheckCitusVersion(ERROR);

    Oid distributedTableId = PG_GETARG_OID(0);

    /*
     * Ensure the table still exists by trying to acquire a lock on it
     * If function returns NULL, it means the table doesn't exist
     * hence we should skip
     */
    Relation relation = try_relation_open(distributedTableId, AccessShareLock);

    if (relation != NULL) {
        UpdateTableStatistics(distributedTableId);

        /*
         * We release the lock here since spq_update_table_statistics
         * is usually used in the following command:
         * SELECT spq_update_table_statistics(logicalrelid) FROM pg_dist_partition;
         * In this way we avoid holding the locks on distributed tables for a long time:
         * If we close the relation with NoLock, the locks on the distributed tables will
         * be held until the above command is finished (all distributed tables are
         * updated).
         */
        relation_close(relation, AccessShareLock);
    } else {
        ereport(NOTICE, (errmsg("relation with OID %u does not exist, skipping",
                                distributedTableId)));
    }

    PG_RETURN_VOID();
}

/*
 * CheckDistributedTable checks if the given relationId corresponds to a
 * distributed table. If it does not, the function errors out.
 */
void CheckDistributedTable(Oid relationId)
{
    char* relationName = get_rel_name(relationId);

    /* check that the relationId belongs to a table */
    EnsureRelationKindSupported(relationId);

    if (!IsCitusTable(relationId)) {
        ereport(ERROR,
                (errmsg("relation \"%s\" is not a distributed table", relationName)));
    }
}

/*
 * CreateAppendDistributedShardPlacements creates shards for append distributed
 * tables on worker nodes. After successfully creating shard on the worker,
 * shard placement rows are added to the metadata.
 */
void CreateAppendDistributedShardPlacements(Oid relationId, int64 shardId,
                                            List* workerNodeList, int replicationFactor)
{
    int attemptCount = replicationFactor;
    int workerNodeCount = list_length(workerNodeList);
    int placementsCreated = 0;
    IncludeSequenceDefaults includeSequenceDefaults = NO_SEQUENCE_DEFAULTS;
    IncludeIdentities includeIdentityDefaults = NO_IDENTITY;

    bool creatingShellTableOnRemoteNode = false;
    List* ddlCommandList = GetFullTableCreationCommands(
        relationId, includeSequenceDefaults, includeIdentityDefaults,
        creatingShellTableOnRemoteNode);
    uint32 connectionFlag = FOR_DDL;
    char* relationOwner = TableOwner(relationId);

    /* if we have enough nodes, add an extra placement attempt for backup */
    if (workerNodeCount > replicationFactor) {
        attemptCount++;
    }

    for (int attemptNumber = 0; attemptNumber < attemptCount; attemptNumber++) {
        int workerNodeIndex = attemptNumber % workerNodeCount;
        WorkerNode* workerNode = (WorkerNode*)list_nth(workerNodeList, workerNodeIndex);

        if (NodeIsCoordinator(workerNode)) {
            ereport(
                NOTICE,
                (errmsg("Creating placements for the append partitioned tables on the "
                        "coordinator is not supported, skipping coordinator ...")));
            continue;
        }

        uint32 nodeGroupId = workerNode->groupId;
        char* nodeName = workerNode->workerName;
        uint32 nodePort = workerNode->workerPort;
        const uint64 shardSize = 0;
        MultiConnection* connection = GetNodeUserDatabaseConnection(
            connectionFlag, nodeName, nodePort, relationOwner, NULL);

        if (PQstatus(connection->pgConn) != CONNECTION_OK) {
            ereport(WARNING,
                    (errmsg("could not connect to node \"%s:%u\"", nodeName, nodePort)));

            continue;
        }

        List* commandList =
            WorkerCreateShardCommandList(relationId, shardId, ddlCommandList);

        ExecuteCriticalRemoteCommandList(connection, commandList);

        InsertShardPlacementRow(shardId, INVALID_PLACEMENT_ID, shardSize, nodeGroupId);
        placementsCreated++;

        if (placementsCreated >= replicationFactor) {
            break;
        }
    }

    /* check if we created enough shard replicas */
    if (placementsCreated < replicationFactor) {
        ereport(ERROR, (errmsg("could only create %u of %u of required shard replicas",
                               placementsCreated, replicationFactor)));
    }
}

/*
 * InsertShardPlacementRows inserts shard placements to the metadata table on
 * the coordinator node.
 */
void InsertShardPlacementRows(Oid relationId, int64 shardId, List* workerNodeList,
                              int workerStartIndex, int replicationFactor)
{
    int workerNodeCount = list_length(workerNodeList);

    for (int placementIndex = 0; placementIndex < replicationFactor; placementIndex++) {
        int workerNodeIndex = (workerStartIndex + placementIndex) % workerNodeCount;
        WorkerNode* workerNode = (WorkerNode*)list_nth(workerNodeList, workerNodeIndex);
        uint32 nodeGroupId = workerNode->groupId;
        const uint64 shardSize = 0;

        InsertShardPlacementRow(shardId, INVALID_PLACEMENT_ID, shardSize, nodeGroupId);
    }
}

/*
 * CreateShardsOnWorkers creates shards on worker nodes given the shard placements
 * as a parameter. The function creates the shards via the executor. This means
 * that it can adopt the number of connections required to create the shards.
 */
void CreateShardsOnWorkers(Oid distributedRelationId, List* shardPlacements,
                           bool useExclusiveConnection)
{
    IncludeSequenceDefaults includeSequenceDefaults = NO_SEQUENCE_DEFAULTS;
    IncludeIdentities includeIdentityDefaults = NO_IDENTITY;

    bool creatingShellTableOnRemoteNode = false;
    List* ddlCommandList = GetFullTableCreationCommands(
        distributedRelationId, includeSequenceDefaults, includeIdentityDefaults,
        creatingShellTableOnRemoteNode);

    int taskId = 1;
    List* taskList = NIL;
    int poolSize = 1;

    ShardPlacement* shardPlacement = NULL;
    foreach_declared_ptr(shardPlacement, shardPlacements)
    {
        uint64 shardId = shardPlacement->shardId;
        ShardInterval* shardInterval = LoadShardInterval(shardId);
        List* relationShardList = RelationShardListForShardCreate(shardInterval);

        List* commandList =
            WorkerCreateShardCommandList(distributedRelationId, shardId, ddlCommandList);

        Task* task = CitusMakeNode(Task);
        task->jobId = INVALID_JOB_ID;
        task->taskId = taskId++;
        task->taskType = DDL_TASK;
        SetTaskQueryStringList(task, commandList);

        task->replicationModel = REPLICATION_MODEL_INVALID;
        task->dependentTaskList = NIL;
        task->anchorShardId = shardId;
        task->relationShardList = relationShardList;
        task->taskPlacementList = list_make1(shardPlacement);

        taskList = lappend(taskList, task);
    }

    if (useExclusiveConnection) {
        /*
         * When the table has local data, we force max parallelization so data
         * copy is done efficiently. We also prefer to use max parallelization
         * when we're inside a transaction block because the user might execute
         * compute heavy commands (e.g., load data or create index) later in the
         * transaction block.
         */
        SetLocalForceMaxQueryParallelization();

        /*
         * TODO: After we fix adaptive executor to record parallel access for
         * ForceMaxQueryParallelization, we should remove this. This is just
         * to force adaptive executor to record parallel access to relations.
         *
         * Adaptive executor uses poolSize to decide if it should record parallel
         * access to relations or not, and it ignores ForceMaxQueryParallelization
         * because of some complications in TRUNCATE.
         */
        poolSize = Session_ctx::Vars().MaxAdaptiveExecutorPoolSize;
    }
    bool localExecutionSupported = true;
    ExecuteUtilityTaskListExtended(taskList, poolSize, localExecutionSupported);
}

/*
 * RelationShardListForShardCreate gets a shard interval and returns the placement
 * accesses that would happen when a placement of the shard interval is created.
 */
static List* RelationShardListForShardCreate(ShardInterval* shardInterval)
{
    Oid relationId = shardInterval->relationId;
    CitusTableCacheEntry* cacheEntry = GetCitusTableCacheEntry(relationId);
    List* referencedRelationList = cacheEntry->referencedRelationsViaForeignKey;
    List* referencingRelationList = cacheEntry->referencingRelationsViaForeignKey;
    int shardIndex = -1;

    /* list_concat_*() modifies the first arg, so make a copy first */
    List* allForeignKeyRelations = list_copy(referencedRelationList);
    allForeignKeyRelations =
        list_concat_unique_oid(allForeignKeyRelations, referencingRelationList);

    /* record the placement access of the shard itself */
    RelationShard* relationShard = CitusMakeNode(RelationShard);
    relationShard->relationId = relationId;
    relationShard->shardId = shardInterval->shardId;
    List* relationShardList = list_make1(relationShard);

    if ((IsCitusTableTypeCacheEntry(cacheEntry, HASH_DISTRIBUTED) ||
         IsCitusTableTypeCacheEntry(cacheEntry, SINGLE_SHARD_DISTRIBUTED)) &&
        cacheEntry->colocationId != INVALID_COLOCATION_ID) {
        shardIndex = ShardIndex(shardInterval);
    }

    /* all foregin key constraint relations */
    Oid fkeyRelationid = InvalidOid;
    foreach_declared_oid(fkeyRelationid, allForeignKeyRelations)
    {
        uint64 fkeyShardId = INVALID_SHARD_ID;

        if (!IsCitusTable(fkeyRelationid)) {
            /* we're not interested in local tables */
            continue;
        }

        if (IsCitusTableType(fkeyRelationid, REFERENCE_TABLE)) {
            fkeyShardId = GetFirstShardId(fkeyRelationid);
        } else if (IsCitusTableTypeCacheEntry(cacheEntry, HASH_DISTRIBUTED) &&
                   IsCitusTableType(fkeyRelationid, HASH_DISTRIBUTED)) {
            /* hash distributed tables should be colocated to have fkey */
            Assert(TableColocationId(fkeyRelationid) == cacheEntry->colocationId);

            fkeyShardId = ColocatedShardIdInRelation(fkeyRelationid, shardIndex);
        } else {
            /*
             * We currently do not support foreign keys from/to local tables or
             * non-colocated tables when creating shards. Also note that shard
             * creation via shard moves doesn't happen in a transaction block,
             * so not relevant here.
             */
            continue;
        }

        RelationShard* fkeyRelationShard = CitusMakeNode(RelationShard);
        fkeyRelationShard->relationId = fkeyRelationid;
        fkeyRelationShard->shardId = fkeyShardId;

        relationShardList = lappend(relationShardList, fkeyRelationShard);
    }

    /* if partitioned table, make sure to record the parent table */
    if (PartitionTable(relationId)) {
        RelationShard* parentRelationShard = CitusMakeNode(RelationShard);

        /* partitioned tables are always co-located */
        Assert(shardIndex != -1);

        parentRelationShard->relationId = PartitionParentOid(relationId);
        parentRelationShard->shardId =
            ColocatedShardIdInRelation(parentRelationShard->relationId, shardIndex);

        relationShardList = lappend(relationShardList, parentRelationShard);
    }

    return relationShardList;
}

/*
 * WorkerCreateShardCommandList returns a list of DDL commands for the given
 * shardId to create the shard on the worker node.
 */
List* WorkerCreateShardCommandList(Oid relationId, uint64 shardId, List* ddlCommandList)
{
    List* commandList = NIL;
    Oid schemaId = get_rel_namespace(relationId);
    char* schemaName = get_namespace_name(schemaId);

    TableDDLCommand* ddlCommand = NULL;
    foreach_declared_ptr(ddlCommand, ddlCommandList)
    {
        Assert(CitusIsA(ddlCommand, TableDDLCommand));
        char* applyDDLCommand =
            GetShardedTableDDLCommand(ddlCommand, shardId, schemaName);
        commandList = lappend(commandList, applyDDLCommand);
    }

    ShardInterval* shardInterval = LoadShardInterval(shardId);

    commandList =
        list_concat(commandList, CopyShardForeignConstraintCommandList(shardInterval));

    return commandList;
}

/*
 * UpdateShardStatistics updates metadata (shard size and shard min/max values)
 * of the given shard and returns the updated shard size.
 */
uint64 UpdateShardStatistics(int64 shardId)
{
    ShardInterval* shardInterval = LoadShardInterval(shardId);
    Oid relationId = shardInterval->relationId;
    bool statsOK = false;
    uint64 shardSize = 0;

    /* Build shard qualified name. */
    char* shardName = get_rel_name(relationId);
    Oid schemaId = get_rel_namespace(relationId);
    char* schemaName = get_namespace_name(schemaId);

    AppendShardIdToName(&shardName, shardId);

    char* shardQualifiedName = quote_qualified_identifier(schemaName, shardName);

    List* shardPlacementList = ActiveShardPlacementList(shardId);

    /* get shard's statistics from a shard placement */
    ShardPlacement* placement = NULL;
    foreach_declared_ptr(placement, shardPlacementList)
    {
        statsOK = WorkerShardStats(placement, relationId, shardQualifiedName, &shardSize);
        if (statsOK) {
            break;
        }
    }

    /*
     * If for some reason we appended data to a shard, but failed to retrieve
     * statistics we just WARN here to avoid losing shard-state updates. Note
     * that this means we will return 0 as the shard fill-factor, and this shard
     * also won't be pruned as the statistics will be empty. If the failure was
     * transient, a subsequent append call will fetch the correct statistics.
     */
    if (!statsOK) {
        ereport(WARNING,
                (errmsg("could not get statistics for shard %s", shardQualifiedName),
                 errdetail("Setting shard statistics to NULL")));
    }

    UpdateShardSize(shardId, shardInterval, relationId, shardPlacementList, shardSize);

    return shardSize;
}

/*
 * UpdateTableStatistics updates metadata (shard size and shard min/max values)
 * of the shards of the given table. Follows a similar logic to spq_shard_sizes function.
 */
static void UpdateTableStatistics(Oid relationId)
{
    List* citusTableIds = NIL;
    citusTableIds = lappend_oid(citusTableIds, relationId);

    /* we want to use a distributed transaction here to detect distributed deadlocks */
    bool useDistributedTransaction = true;

    List* connectionList =
        SendShardStatisticsQueriesInParallel(citusTableIds, useDistributedTransaction);

    ReceiveAndUpdateShardsSizes(connectionList);
}

/*
 * ReceiveAndUpdateShardsSizes receives shard id and size
 * results from the given connection list, and updates
 * respective entries in pg_dist_placement.
 */
static void ReceiveAndUpdateShardsSizes(List* connectionList)
{
    /*
     * From the connection list, we will not get all the shards, but
     * all the placements. We use a hash table to remember already visited shard ids
     * since we update all the different placements of a shard id at once.
     */
    HTAB* alreadyVisitedShardPlacements =
        CreateSimpleHashSetWithName(Oid, "oid visited hash set");

    MultiConnection* connection = NULL;
    foreach_declared_ptr(connection, connectionList)
    {
        if (PQstatus(connection->pgConn) != CONNECTION_OK) {
            continue;
        }

        bool raiseInterrupts = true;
        PGresult* result = GetRemoteCommandResult(connection, raiseInterrupts);
        if (!IsResponseOK(result)) {
            ReportResultError(connection, result, WARNING);
            continue;
        }

        int64 rowCount = PQntuples(result);
        int64 colCount = PQnfields(result);

        /* Although it is not expected */
        if (colCount != SHARD_SIZES_COLUMN_COUNT) {
            ereport(WARNING, (errmsg("unexpected number of columns from "
                                     "spq_update_table_statistics")));
            continue;
        }

        for (int64 rowIndex = 0; rowIndex < rowCount; rowIndex++) {
            uint64 shardId = 0;
            uint64 shardSize = 0;

            if (!ProcessShardStatisticsRow(result, rowIndex, &shardId, &shardSize)) {
                /* this row has no valid shard statistics */
                continue;
            }

            if (OidVisited(alreadyVisitedShardPlacements, shardId)) {
                /* We have already updated this placement list */
                continue;
            }

            VisitOid(alreadyVisitedShardPlacements, shardId);

            ShardInterval* shardInterval = LoadShardInterval(shardId);
            Oid relationId = shardInterval->relationId;
            List* shardPlacementList = ActiveShardPlacementList(shardId);

            UpdateShardSize(shardId, shardInterval, relationId, shardPlacementList,
                            shardSize);
        }
        PQclear(result);
        ForgetResults(connection);
    }
    hash_destroy(alreadyVisitedShardPlacements);
}

/*
 * ProcessShardStatisticsRow processes a row of shard statistics of the input PGresult
 * - it returns true if this row belongs to a valid shard
 * - it returns false if this row has no valid shard statistics (shardId =
 * INVALID_SHARD_ID)
 *
 * Input tuples are assumed to be of the form:
 * (shard_id bigint, shard_name text, shard_size bigint)
 */
static bool ProcessShardStatisticsRow(PGresult* result, int64 rowIndex, uint64* shardId,
                                      uint64* shardSize)
{
    *shardId = ParseIntField(result, rowIndex, 0);

    /* check for the dummy entries we put so that UNION ALL wouldn't complain */
    if (*shardId == INVALID_SHARD_ID) {
        /* this row has no valid shard statistics */
        return false;
    }

    *shardSize = ParseIntField(result, rowIndex, 1);
    return true;
}

/*
 * UpdateShardSize updates the shardlength (shard size) of the given
 * shard and its placements in pg_dist_placement.
 */
static void UpdateShardSize(uint64 shardId, ShardInterval* shardInterval, Oid relationId,
                            List* shardPlacementList, uint64 shardSize)
{
    ShardPlacement* placement = NULL;

    /* update metadata for each shard placement */
    foreach_declared_ptr(placement, shardPlacementList)
    {
        uint64 placementId = placement->placementId;
        int32 groupId = placement->groupId;

        DeleteShardPlacementRow(placementId);
        InsertShardPlacementRow(shardId, placementId, shardSize, groupId);
    }
}

/*
 * WorkerShardStats queries the worker node, and retrieves shard statistics that
 * we assume have changed after new table data have been appended to the shard.
 */
static bool WorkerShardStats(ShardPlacement* placement, Oid relationId,
                             const char* shardName, uint64* shardSize)
{
    StringInfo tableSizeQuery = makeStringInfo();
    PGresult* queryResult = NULL;
    char* tableSizeStringEnd = NULL;

    int connectionFlags = 0;

    MultiConnection* connection =
        GetPlacementConnection(connectionFlags, placement, NULL);

    /*
     * This code-path doesn't support optional connections, so we don't expect
     * NULL connections.
     */
    Assert(connection != NULL);

    *shardSize = 0;

    char* quotedShardName = quote_literal_cstr(shardName);
    appendStringInfo(tableSizeQuery, SHARD_TABLE_SIZE_QUERY, quotedShardName);

    int executeCommand =
        ExecuteOptionalRemoteCommand(connection, tableSizeQuery->data, &queryResult);
    if (executeCommand != 0) {
        return false;
    }

    char* tableSizeString = PQgetvalue(queryResult, 0, 0);
    if (tableSizeString == NULL) {
        PQclear(queryResult);
        ForgetResults(connection);
        return false;
    }

    errno = 0;
    uint64 tableSize = std::strtoull(tableSizeString, &tableSizeStringEnd, 0);
    if (errno != 0 || (*tableSizeStringEnd) != '\0') {
        PQclear(queryResult);
        ForgetResults(connection);
        return false;
    }

    *shardSize = tableSize;

    PQclear(queryResult);
    ForgetResults(connection);

    return true;
}

/*
 * ForeignConstraintGetReferencedTableId parses given foreign constraint query and
 * extracts referenced table id from it.
 */
Oid ForeignConstraintGetReferencedTableId(const char* queryString)
{
    Node* queryNode = ParseTreeNode(queryString);
    AlterTableStmt* foreignConstraintStmt = (AlterTableStmt*)queryNode;
    AlterTableCmd* command = (AlterTableCmd*)linitial(foreignConstraintStmt->cmds);

    if (command->subtype == AT_AddConstraint) {
        Constraint* constraint = (Constraint*)command->def;
        if (constraint->contype == CONSTR_FOREIGN) {
            RangeVar* referencedTable = constraint->pktable;

            return RangeVarGetRelid(referencedTable, NoLock,
                                    foreignConstraintStmt->missing_ok);
        }
    }

    return InvalidOid;
}
